Skip to content

Commit 0d4eacf

Browse files
committed
reprocess failed account events
1 parent 626a465 commit 0d4eacf

File tree

5 files changed

+146
-19
lines changed

5 files changed

+146
-19
lines changed

kitsune/settings.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,7 @@ def filter_exceptions(event, hint):
11271127
DMS_FIX_CURRENT_REVISIONS = config("DMS_FIX_CURRENT_REVISIONS", default=None)
11281128
DMS_COHORT_ANALYSIS = config("DMS_COHORT_ANALYSIS", default=None)
11291129
DMS_UPDATE_L10N_CONTRIBUTOR_METRICS = config("DMS_UPDATE_L10N_CONTRIBUTOR_METRICS", default=None)
1130+
DMS_REPROCESS_FAILED_ACCOUNT_EVENTS = config("DMS_REPROCESS_FAILED_ACCOUNT_EVENTS", default=None)
11301131

11311132
PROD_DETAILS_CACHE_NAME = "product-details"
11321133
PROD_DETAILS_STORAGE = config(

kitsune/users/management/commands/process_account_events.py renamed to kitsune/users/management/commands/reprocess_failed_account_events.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@
44

55

66
class Command(BaseCommand):
7-
help = "Process all unprocessed account events created within the given past number of days."
7+
help = "Process all unprocessed account events created within the given number of hours."
88

99
def add_arguments(self, parser):
1010
parser.add_argument(
11-
"num_days_ago",
11+
"--within-hours",
1212
type=int,
13+
default=24,
1314
help=(
14-
"The past number of days within which the "
15-
"unprocessed account events have been created."
15+
"The number of hours within which the unprocessed "
16+
"account events have been created."
1617
),
1718
)
1819

1920
def handle(self, *args, **options):
20-
process_unprocessed_account_events.delay(options["num_days_ago"])
21+
process_unprocessed_account_events.delay(options["within_hours"])

kitsune/users/tasks.py

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,26 @@
33

44
import waffle
55
from celery import shared_task
6+
from django.db import transaction
67

78
from kitsune.products.models import Product
8-
from kitsune.sumo.decorators import skip_if_read_only_mode
99
from kitsune.users.auth import FXAAuthBackend
1010
from kitsune.users.models import AccountEvent
1111
from kitsune.users.utils import anonymize_user, delete_user_pipeline
1212

1313
shared_task_with_retry = shared_task(
14-
acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_kwargs=dict(max_retries=4)
14+
acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_kwargs=dict(max_retries=3)
1515
)
1616

1717

1818
@shared_task_with_retry
19-
@skip_if_read_only_mode
19+
@transaction.atomic
2020
def process_event_delete_user(event_id):
21-
event = AccountEvent.objects.get(id=event_id)
21+
try:
22+
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
23+
except AccountEvent.DoesNotExist:
24+
return
25+
2226
user = event.profile.user
2327
event.profile = None
2428
event.save(update_fields=["profile"])
@@ -33,9 +37,13 @@ def process_event_delete_user(event_id):
3337

3438

3539
@shared_task_with_retry
36-
@skip_if_read_only_mode
40+
@transaction.atomic
3741
def process_event_subscription_state_change(event_id):
38-
event = AccountEvent.objects.get(id=event_id)
42+
try:
43+
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
44+
except AccountEvent.DoesNotExist:
45+
return
46+
3947
body = json.loads(event.body)
4048

4149
last_event = AccountEvent.objects.filter(
@@ -60,9 +68,13 @@ def process_event_subscription_state_change(event_id):
6068

6169

6270
@shared_task_with_retry
63-
@skip_if_read_only_mode
71+
@transaction.atomic
6472
def process_event_password_change(event_id):
65-
event = AccountEvent.objects.get(id=event_id)
73+
try:
74+
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
75+
except AccountEvent.DoesNotExist:
76+
return
77+
6678
body = json.loads(event.body)
6779

6880
change_time = datetime.utcfromtimestamp(body["changeTime"] / 1000.0)
@@ -79,9 +91,13 @@ def process_event_password_change(event_id):
7991

8092

8193
@shared_task_with_retry
82-
@skip_if_read_only_mode
94+
@transaction.atomic
8395
def process_event_profile_change(event_id):
84-
event = AccountEvent.objects.get(id=event_id)
96+
try:
97+
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
98+
except AccountEvent.DoesNotExist:
99+
return
100+
85101
refresh_token = event.profile.fxa_refresh_token
86102

87103
fxa = FXAAuthBackend()
@@ -100,15 +116,15 @@ def process_event_profile_change(event_id):
100116

101117

102118
@shared_task
103-
def process_unprocessed_account_events(days):
119+
def process_unprocessed_account_events(within_hours):
104120
"""
105121
Attempt to process all unprocessed account events that have been
106-
created within the past "days" number of days.
122+
created within the given number of hours.
107123
"""
108-
days_ago = datetime.now() - timedelta(days=days)
124+
hours_ago = datetime.now() - timedelta(hours=within_hours)
109125

110126
for event in AccountEvent.objects.filter(
111-
status=AccountEvent.UNPROCESSED, created_at__gte=days_ago
127+
status=AccountEvent.UNPROCESSED, created_at__gte=hours_ago
112128
):
113129
match event.event_type:
114130
case AccountEvent.DELETE_USER:

kitsune/users/tests/test_tasks.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
import json
22
from datetime import datetime
3+
from unittest.mock import patch
34

45
from django.conf import settings
56
from django.contrib.auth.models import User
7+
from django.db import DatabaseError
68
from waffle.testutils import override_switch
79

810
from kitsune.messages.utils import send_message
@@ -15,6 +17,7 @@
1517
process_event_subscription_state_change,
1618
)
1719
from kitsune.users.tests import AccountEventFactory, GroupFactory, ProfileFactory, UserFactory
20+
from kitsune.wiki.tests import ApprovedRevisionFactory
1821

1922

2023
class AccountEventsTasksTestCase(TestCase):
@@ -67,6 +70,39 @@ def test_process_delete_user(self):
6770

6871
self.assertEqual(account_event.status, AccountEvent.PROCESSED)
6972

73+
@override_switch("enable-account-deletion", active=True)
74+
def test_process_delete_user_atomicity(self):
75+
"""Ensure that the processing of the delete user event is atomic."""
76+
profile = ProfileFactory()
77+
account_event = AccountEventFactory(
78+
body=json.dumps({}),
79+
event_type=AccountEvent.DELETE_USER,
80+
status=AccountEvent.UNPROCESSED,
81+
profile=profile,
82+
)
83+
rev = ApprovedRevisionFactory(creator=profile.user)
84+
85+
def event_save(*args, **kwargs):
86+
event_save.call_count += 1
87+
if event_save.call_count > 1:
88+
raise DatabaseError()
89+
return super(AccountEvent, account_event).save(*args, **kwargs)
90+
91+
event_save.call_count = 0
92+
93+
with patch("kitsune.users.tasks.AccountEvent.save") as event_save_mock:
94+
event_save_mock.side_effect = event_save
95+
with self.assertRaises(DatabaseError):
96+
process_event_delete_user(account_event.id)
97+
98+
rev.refresh_from_db()
99+
account_event.refresh_from_db()
100+
101+
self.assertEqual(account_event.profile, profile)
102+
self.assertEqual(account_event.status, AccountEvent.UNPROCESSED)
103+
self.assertEqual(rev.creator.username, profile.user.username)
104+
self.assertTrue(User.objects.filter(id=profile.user.id).exists())
105+
70106
def test_process_subscription_state_change(self):
71107
product_1 = ProductFactory(codename="capability_1")
72108
product_2 = ProductFactory(codename="capability_2")
@@ -148,6 +184,36 @@ def test_process_subscription_state_change_out_of_order(self):
148184
account_event_3.refresh_from_db()
149185
self.assertEqual(account_event_3.status, AccountEvent.IGNORED)
150186

187+
def test_process_subscription_state_change_atomicity(self):
188+
"""Ensure that the processing of the subscription state change is atomic."""
189+
ProductFactory(codename="capability_1")
190+
ProductFactory(codename="capability_2")
191+
product = ProductFactory(codename="capability_3")
192+
profile = ProfileFactory()
193+
profile.products.add(product)
194+
account_event = AccountEventFactory(
195+
body=json.dumps(
196+
{
197+
"capabilities": ["capability_1", "capability_2"],
198+
"isActive": True,
199+
"changeTime": 1,
200+
}
201+
),
202+
event_type=AccountEvent.SUBSCRIPTION_STATE_CHANGE,
203+
status=AccountEvent.UNPROCESSED,
204+
profile=profile,
205+
)
206+
207+
with patch("kitsune.users.tasks.AccountEvent.save") as event_save_mock:
208+
event_save_mock.side_effect = DatabaseError()
209+
with self.assertRaises(DatabaseError):
210+
process_event_subscription_state_change(account_event.id)
211+
212+
account_event.refresh_from_db()
213+
214+
self.assertEqual(account_event.status, AccountEvent.UNPROCESSED)
215+
self.assertEqual(list(p.codename for p in profile.products.all()), ["capability_3"])
216+
151217
def test_process_password_change(self):
152218
profile = ProfileFactory()
153219
account_event_1 = AccountEventFactory(
@@ -179,3 +245,24 @@ def test_process_password_change(self):
179245

180246
self.assertEqual(profile.fxa_password_change, datetime.utcfromtimestamp(2))
181247
self.assertEqual(account_event_2.status, AccountEvent.IGNORED)
248+
249+
def test_process_password_change_atomicity(self):
250+
"""Ensure that the processing of the password change is atomic."""
251+
profile = ProfileFactory()
252+
account_event = AccountEventFactory(
253+
body=json.dumps({"changeTime": 2000}),
254+
event_type=AccountEvent.PASSWORD_CHANGE,
255+
status=AccountEvent.UNPROCESSED,
256+
profile=profile,
257+
)
258+
259+
with patch("kitsune.users.tasks.AccountEvent.save") as event_save_mock:
260+
event_save_mock.side_effect = DatabaseError()
261+
with self.assertRaises(DatabaseError):
262+
process_event_password_change(account_event.id)
263+
264+
profile.refresh_from_db()
265+
account_event.refresh_from_db()
266+
267+
self.assertIs(profile.fxa_password_change, None)
268+
self.assertEqual(account_event.status, AccountEvent.UNPROCESSED)

scripts/cron.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,28 @@ def job_cleanup_old_account_events():
399399
call_command("cleanup_old_account_events")
400400

401401

402+
# Every 4 hours, 15 minutes after the hour.
403+
@scheduled_job(
404+
"cron",
405+
month="*",
406+
day="*",
407+
hour="*/4",
408+
minute="15",
409+
max_instances=1,
410+
coalesce=True,
411+
skip=settings.READ_ONLY,
412+
)
413+
@babis.decorator(ping_after=settings.DMS_REPROCESS_FAILED_ACCOUNT_EVENTS)
414+
def job_reprocess_failed_account_events():
415+
"""
416+
Re-process any account events created within the past 24 hours that remain
417+
in the unprocessed state. Kicks off a Celery task that does the following:
418+
* Gathers all unprocessed account events created within the past 24 hours.
419+
* Kicks off a separate Celery task to reprocess each one.
420+
"""
421+
call_command("reprocess_failed_account_events --within-hours 24")
422+
423+
402424
def run():
403425
try:
404426
schedule.start()

0 commit comments

Comments
 (0)